package com.dahuan.time;

import com.dahuan.bean.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;

public class Window_EventTime {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );
        //TODO 基于事件时间
        env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime);
        //水印之间的间隔（以毫秒为单位）
        env.getConfig().setAutoWatermarkInterval( 100 );

        DataStream<String> inputStream = env.socketTextStream( "localhost", 7777 );
        DataStream<SensorReading> dataStream = inputStream.map( data -> {
            String[] split = data.split( "," );
            return new SensorReading( split[0], new Long( split[1] ), new Double( split[2] ) );
        } )

                //升序时间设置watermarks
                //.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {
//                    @Override
//                    public long extractAscendingTimestamp(SensorReading element) {
//                        return element.getTimestamp() * 1000L;
//                    }
//                })

                //乱序数据设置watermarks
                //TODO assignTimestampsAndWatermarks是毫秒 ，所以乘以1000变成秒
                //延迟两秒处理
                .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<SensorReading>( Time.seconds( 2 ) ) {
                                                    @Override
                                                    public long extractTimestamp(SensorReading element) {
                                                        //TODO 将毫秒变成秒
                                                        return element.getTimestamp() * 1000L;
                                                    }
                                                }

                );


        //基于事件的开窗聚合，统计5秒之内最小的温度值 , 滚动窗口为 5
        SingleOutputStreamOperator<SensorReading> maxTemp = dataStream.keyBy( data -> data.getId() )
                //TODO 开一个时间窗口 (滚动窗口) 每隔五秒执行一次
                .timeWindow( Time.seconds( 5 ) )
                .minBy( "temperature" );

        maxTemp.print("maxTemp");

        env.execute("Window_EventTime");
    }
}
